package org.xqh.study.google.guava.concurrent;

import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import com.google.common.util.concurrent.*;
import org.junit.Test;

import javax.annotation.Nullable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 并发工具集
 * 1、MapMaker
 * 2、Monitor
 * 3、ListenableFuture
 * 4、RateLimiter
 * 5、AbstractScheduledService
 * 6、AbstractExecutionThreadService
 * 7、ServiceManager
 */
public class ConcurrentTest {

    //MapMaker Concurrent Map 的神器。
    //MapMaker融合了Weak Reference，线程安全，高并发性能，异步超时清理，自定义构建元素等强大功能于一身。
    @Test
    public void testMapMaker() throws InterruptedException {
        ConcurrentMap<String, Object> map = new MapMaker()
                //允许的并发数
                .concurrencyLevel(2)
                //初始大小
                .initialCapacity(4)
                //使用弱引用存储键。当键没有其它（强或软）引用时，缓存项可以被垃圾回收。
                .weakKeys()
                //使用弱引用存储值。当值没有其它（强或软）引用时，缓存项可以被垃圾回收。
                .weakValues()
                //build出来ConcurrentMap
                .makeMap();

        String key = "key";
        Object value = new Object();

        map.put(key, value);

        key = null; // 使 key 变成了WeakReference
        value = null;// 使 value 变成了WeakReference

        System.gc();// 触发垃圾回收
        TimeUnit.SECONDS.sleep(1L);

        System.out.println(map.get("key"));
        System.out.println(map.keySet().size());
        System.out.println(map.values().size());
    }

    //Monitor类是作为ReentrantLock的一个替代，代码中使用 Monitor比使用ReentrantLock更不易出错，可读性也更强，并且也没有显著的性能损失，使用Monitor甚至有潜在的性能得到优化。
    @Test
    public void testMonitor() throws InterruptedException {
        final MonitorExample monitorDemo = new MonitorExample();
        for (int i = 0; i < 5; i++) {
            new Thread() {
                @Override
                public void run() {
                    for (int count = 0; count < 10; count++) {
                        try {
                            //monitorDemo.addToListWait(count + "--->" + Thread.currentThread().getName());
                            monitorDemo.addToListSkipWait(count + "--->" + Thread.currentThread().getName());
                            Thread.sleep(100L);
                        } catch (Exception e) {
                            System.out.println(e);
                        }
                    }
                }
            }.start();
        }

        // 等待所有线程执行完毕
        TimeUnit.SECONDS.sleep(2);

        System.out.println("---------------------------------------- 我是分割线 ---------------------------------------");
        Iterator iteratorStringList = monitorDemo.list.iterator();
        while (iteratorStringList.hasNext()) {
            System.out.println(iteratorStringList.next());
        }
    }

    // ListenableFuture顾名思义就是可以监听的Future，它是对java原生Future的扩展增强。
    // 我们知道Future表示一个异步计算任务，当任务完成时可以得到计算结果。
    // 如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算，就必须使用另一个线程不断的查询计算状态。这样做，代码复杂，而且效率低下。
    // 而Guava的 ListenableFuture 使用观察者模式帮我们检测Future是否完成了，如果完成就自动调用回调函数，这样可以减少并发程序的复杂度。
    @Test
    public void testListenableFuture() throws InterruptedException {
        /*
        1、MoreExecutors 该类是final类型的工具类，提供了很多静态方法。例如listeningDecorator方法初始化ListeningExecutorService方法。
        2、ListeningExecutorService 该类是对ExecutorService的扩展，重写ExecutorService类中的submit方法，返回ListenableFuture对象。
        3、ListenableFuture 该接口扩展了Future接口，增加了addListener方法，该方法在给定的excutor上注册一个监听器，当计算完成时会马上调用该监听器。不能够确保监听器执行的顺序，但可以在计算完成时确保马上被调用。
        4、FutureCallback 该接口提供了OnSuccess和OnFailuren方法。获取异步计算的结果并回调。上面的监听是没有取得返回值的情况，利用callback监听可以取得返回值。
        5、Futures 该类提供和很多实用的静态方法以供使用。
            Futures.allAsList这个方法用来把多个ListenableFuture组合成一个。
            Futures.transform[Async]这个方法用于转换返回值，有同步和异步。
        */

        System.out.println("main start thread=" + Thread.currentThread().getName());

        // 创建工作线程池
        ExecutorService service = Executors.newFixedThreadPool(10);

        // 创建 ListeningExecutorService
        ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(service);

        // 子线程任务1
        ListenableFuture<Integer> listenableFutureInteger = listeningExecutorService.submit(() -> {
            System.out.println("integerTask start=" + Thread.currentThread().getName());
            Thread.sleep(100);
            System.out.println("integerTask end=" + Thread.currentThread().getName());
            return new Random().nextInt(100);
        });

        /* 对任务1添加监听方式一：此方式不能监听失败，不推荐，建议使用下面的
        listenableFutureInteger.addListener(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("get listenable future's result " + listenableFutureInteger.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }, listeningExecutorService);*/

        // 对任务1添加监听方式二
        Futures.addCallback(listenableFutureInteger, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(@Nullable Integer result) {
                System.out.println("integerTask done " + "result=" + result + " thread=" +Thread.currentThread().getName());
            }
            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        }, listeningExecutorService);



        // 子线程任务2
        ListenableFuture<String> listenableFutureString = listeningExecutorService.submit(() -> {
            System.out.println("stringTask start=" + Thread.currentThread().getName());
            Thread.sleep(50);
            System.out.println("stringTask end=" + Thread.currentThread().getName());
            return " ok";
        });

        // 对任务2添加监听
        Futures.addCallback(listenableFutureString, new FutureCallback<String>() {
            @Override
            public void onSuccess(@Nullable String result) {
                System.out.println("stringTask done " + "result=" + result + " thread=" +Thread.currentThread().getName());
            }
            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        }, listeningExecutorService);

        //用来把多个ListenableFuture组合成一个。
        //Futures.allAsList(listenableFutureInteger, listenableFutureString);

        // 主线程其他任务
        for (int i = 0 ;i < 4 ;i++){
            System.out.println("main other task " + i + " thread=" + Thread.currentThread().getName());
            Thread.sleep(50);
        }

        // 关闭线程池
        listeningExecutorService.shutdown();
        System.out.println("main end thread=" + Thread.currentThread().getName());
    }

    //RateLimiter(速率限制器)，类似于JDK的信号量Semphore，他用来限制对资源并发访问的线程数。
    //Semphore从线程数量限流，RateLimiter从请求速率限流
    //Guava RateLimiter基于令牌桶(token bucket)算法。
    //我们只需要告诉RateLimiter系统限制的QPS是多少，那么RateLimiter将以这个速度往桶里面放入令牌，然后请求的时候，通过tryAcquire()方法向RateLimiter获取许可（令牌）。
    @Test
    public void testRateLimiter() throws InterruptedException {
        /*
        RateLimiter具体有两个实现类，分别是SmoothBursty以及SmoothWarmingUp。
            SmoothBursty：初始化的storedPermits为0，可以支持burst到maxPermits
            SmoothWarmingUp：初始化的storedPermits为maxPermits(thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros))，也支持burst，但是总体相对平滑。
         */

        //根据指定的稳定吞吐率(permitsPerSecond)和预热期来(warmupPeriod)创建RateLimiter。
        //这里的吞吐率是指每秒多少许可数（通常是指QPS，每秒多少个请求量），
        //在这段预热时间内，RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。（只要存在足够请求数来使其饱和）
        RateLimiter rateLimiter = RateLimiter.create(10, 2, TimeUnit.SECONDS);

        ExecutorService executorService = Executors.newFixedThreadPool(200);

        //排序好的map，后面打印出来看起来方便
        SortedSetMultimap<String, Integer> multimap = TreeMultimap.create();
        AtomicInteger count = new AtomicInteger(0);

        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    //获取令牌桶中一个令牌，最多等待10秒
                    if (rateLimiter.tryAcquire(1, 10, TimeUnit.SECONDS)) {
                        String key = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                        System.out.println("当前时间秒值 = " + key);

                        multimap.put(key, count.addAndGet(1));
                    }
                }
            });
        }

        System.out.println(multimap);

        Thread.sleep(10000);

        //System.out.println(multimap);

        multimap.keySet().forEach(key -> {
            System.out.println(key + "=" + multimap.get(key));
        });

        executorService.shutdown();
    }

    /*
    Guava包里的Service接口用于封装一个服务对象的运行状态、包括start和stop等方法。
    例如web服务器，RPC服务器、计时器等可以实现这个接口。
    对此类服务的状态管理并不轻松、需要对服务的开启/关闭进行妥善管理、特别是在多线程环境下尤为复杂。
    Guava包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。
     */
    //AbstractScheduledService
    //AbstractScheduledService类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务，以及相应的startUp()和shutDown()方法。
    // 为了能够描述执行周期，你需要实现scheduler()方法。通常情况下，你可以使用AbstractScheduledService.Scheduler类提供的两种调度器：newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit)。
    // 类似于JDK并发包中ScheduledExecutorService类提供的两种调度方式。如要自定义schedules则可以使用 CustomScheduler类来辅助实现。
    @Test
    public void testAbstractScheduledService() {
        // 定义AbstractScheduledServiceImpl对象
        AbstractScheduledService service = new AbstractScheduledService() {
            @Override
            protected void startUp() throws Exception {
                //TODO: 做一些初始化操作
            }

            @Override
            protected void shutDown() throws Exception {
                //TODO: 可以做一些清理操作，比如关闭连接啥的。shutDown() 是在线程的具体实现里面调用的
            }

            @Override
            protected void runOneIteration() throws Exception {
                // 每次周期任务的执行逻辑
                try {
                    System.out.println(LocalDateTime.now() + " do work....");
                } catch (Exception e) {
                    //TODO: 处理异常，这里如果抛出异常，会使服务状态变为failed同时导致任务终止。
                }
            }

            @Override
            protected Scheduler scheduler() {
                // 5s执行一次的Scheduler
                return Scheduler.newFixedDelaySchedule(1, 5, TimeUnit.SECONDS);
            }
        };

        // 添加状态监听器
        service.addListener(new Service.Listener() {
            @Override
            public void starting() {
                System.out.println(LocalDateTime.now() + " 服务开始启动.....");
            }

            @Override
            public void running() {
                System.out.println(LocalDateTime.now() + " 服务开始运行");
            }

            @Override
            public void stopping(Service.State from) {
                System.out.println(LocalDateTime.now() + " 服务关闭中");
            }

            @Override
            public void terminated(Service.State from) {
                System.out.println(LocalDateTime.now() + " 服务终止");
            }

            @Override
            public void failed(Service.State from, Throwable failure) {
                System.out.println(LocalDateTime.now() + " 失败，cause：" + failure.getCause());
            }
        }, MoreExecutors.directExecutor());

        // 启动任务
        service.startAsync().awaitRunning();
        System.out.println(LocalDateTime.now() + " 服务状态为:" + service.state());

        // 等待30s
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);

        // 关闭任务
        service.stopAsync().awaitTerminated();
        System.out.println(LocalDateTime.now() + " 服务状态为:" + service.state());
    }

    //AbstractExecutionThreadService
    //AbstractExecutionThreadService在单个线程中执行startup, running, and shutdown，我们必须实现run()方法，同时在方法中要能响应停止服务的请求
    @Test
    public void testAbstractExecutionThreadService() {
        // 定义我们自定义的AbstractExecutionThreadServiceImpl的类对象
        AbstractExecutionThreadService service = new AbstractExecutionThreadService() {
            private volatile boolean running = true; //声明一个状态

            @Override
            protected void startUp() {
                //TODO: 做一些初始化操作
            }

            @Override
            public void run() {
                // 具体需要实现的业务逻辑，会在线程中执行
                while (running) {
                    try {
                        // 等待2s
                        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
                        System.out.println(LocalDateTime.now() + " do our work.....");
                    } catch (Exception e) {
                        //TODO: 处理异常，这里如果抛出异常，会使服务状态变为failed同时导致任务终止。
                    }
                }
            }

            @Override
            protected void triggerShutdown() {
                //TODO: 如果我们的run方法中有无限循环啥的，可以在这里置状态，让退出无限循环，，stopAsync()里面会调用到该方法
                running = false; //这里我们改变状态值，run方法中就能够得到响应。=
            }

            @Override
            protected void shutDown() throws Exception {
                //TODO: 可以做一些清理操作，比如关闭连接啥的。shutDown() 是在线程的具体实现里面调用的
            }
        };

        // 添加状态监听
        service.addListener(new Service.Listener() {
            @Override
            public void starting() {
                System.out.println(LocalDateTime.now() + " 服务开始启动");
            }

            @Override
            public void running() {
                System.out.println(LocalDateTime.now() + " 服务开始运行");
            }

            @Override
            public void stopping(Service.State from) {
                System.out.println(LocalDateTime.now() + " 服务关闭中");
            }

            @Override
            public void terminated(Service.State from) {
                System.out.println(LocalDateTime.now() + " 服务终止");
            }

            @Override
            public void failed(Service.State from, Throwable failure) {
                System.out.println(LocalDateTime.now() + " 失败，cause：" + failure.getCause());
            }
        }, MoreExecutors.directExecutor());

        // 启动服务
        service.startAsync().awaitRunning();
        System.out.println(LocalDateTime.now() + " 服务状态为:" + service.state());

        // 等待30s
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);

        // 停止服务
        service.stopAsync().awaitTerminated();
        System.out.println(LocalDateTime.now() + " 服务状态为:" + service.state());
    }

    //ServiceManager
    @Test
    public void testServiceManager() {
        /*
        除了对Service接口提供基础的实现类。
        Guava还提供了 ServiceManager类使得涉及到多个Service集合的操作更加容易。
        通过实例化ServiceManager类来创建一个Service集合，你可以通过以下方法来管理它们：
            startAsync() ： 将启动所有被管理的服务。如果当前服务的状态都是NEW的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
            stopAsync() ：将停止所有被管理的服务。
            addListener ：会添加一个ServiceManager.Listener，在服务状态转换中会调用该Listener
            awaitHealthy() ：会等待所有的服务达到Running状态
            awaitStopped()：会等待所有服务达到终止状态
        检测类的方法有：
            isHealthy() ：如果所有的服务处于Running状态、会返回True
            servicesByState()：以状态为索引返回当前所有服务的快照
            startupTimes() ：返回一个Map对象，记录被管理的服务启动的耗时、以毫秒为单位，同时Map默认按启动时间排序。
        我们建议整个服务的生命周期都能通过ServiceManager来管理，不过即使状态转换是通过其他机制触发的、也不影响ServiceManager方法的正确执行。
        例如：当一个服务不是通过startAsync()、而是其他机制启动时，listeners 仍然可以被正常调用、awaitHealthy()也能够正常工作。
        ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于New状态。
         */

        // 定义两个服务
        AbstractExecutionThreadServiceImpl service0 = new AbstractExecutionThreadServiceImpl();
        AbstractScheduledServiceImpl service1 = new AbstractScheduledServiceImpl();
        List<Service> serviceList = Lists.newArrayList(service0, service1);

        // ServiceManager里面管理这两个服务
        ServiceManager serviceManager = new ServiceManager(serviceList);
        // 添加监听
        serviceManager.addListener(new ServiceManager.Listener() {
            @Override
            public void healthy() {
                super.healthy();
                System.out.println(LocalDateTime.now() + " healthy");
            }

            @Override
            public void stopped() {
                super.stopped();
                System.out.println(LocalDateTime.now() + " stopped");
            }

            @Override
            public void failure(Service service) {
                super.failure(service);
                System.out.println(LocalDateTime.now() + " failure");
            }
        });

        // 启动服务，等待所有的服务都达到running状态
        serviceManager.startAsync().awaitHealthy();

        // 等待30s
        Uninterruptibles.sleepUninterruptibly(30, TimeUnit.SECONDS);

        // 停止服务
        serviceManager.stopAsync().awaitStopped();
    }

}
